package com.deepuser.load.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.deepuser.load.dao.BehaviorDao;
import com.deepuser.load.model.Behavior;
import com.deepuser.load.mongo.model.UserMog;
import com.deepuser.load.mongo.model.UserTag;
import com.deepuser.load.mongo.service.impl.UserMogServiceImpl;
import com.deepuser.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;

@Slf4j
@Component
@DependsOn("userMogService")
public class KafkaBehaviorService {

    @Resource
    BehaviorDao behaviorDao;

    @KafkaListener(topics = "${spring.kafka.consumer.gourd-topic2}",
            groupId = "${spring.kafka.consumer.group-id}" ,
            containerFactory = "kafkaListenerContainerFactory")
    public void kafkaConsumerBehivor(ConsumerRecord<String, String> record) {
        try {
            log.info("消消费消息 topic = {} , content = {}",record.topic(),record.value());
            String messageText = record.value();
            // 业务代码......
            log.info("消费到的信息==="+messageText);
            JSONObject result = null;
            try {
                result = JSON.parseObject(messageText);
            }catch (Exception e){
                log.error("kafka数据消费异常！！！");
            }
            Behavior behavior = new Behavior();
            behavior.setData(result.toJSONString());
            behavior.setActionname(result.containsKey("actionname")?result.get("actionname").toString():"");
            behavior.setAppkey(result.containsKey("appkey")?result.get("appkey").toString():"");
            behavior.setSuperId(result.containsKey("superId")?result.get("superId").toString():"");
            try{
                behavior.setTimestemp(result.containsKey("timestemp")?Long.getLong(result.get("timestemp").toString()):new Date().getTime());
            }catch (Exception e){
                log.error("Topic:user中存在数据中时间戳异常");
                behavior.setTimestemp(new Date().getTime());
            }
            log.info("生成数据：{}",behavior.toString());
            //同步步将数据写入msql，数据量不大可以这么搞
            behaviorDao.insert(behavior);

        }catch (Exception e){
            log.error("ERROR====>{}",e.getMessage());
            log.error("TOPIC:{},DATA:{},load数据异常","deepuser",record.value());
        }
    }


    @Value("#{${deepuser.idmapping}}")
    public LinkedHashMap<String, HashMap> hashMap;


    @Autowired
    UserMogServiceImpl userMogService;

    @KafkaListener(topics = "${spring.kafka.consumer.gourd-topic3}",
            groupId = "${spring.kafka.consumer.group-id}" ,
            containerFactory = "kafkaListenerContainerFactory")
    public void kafkaConsumerTag(ConsumerRecord<String, String> record) {
        try {
            log.info("消消费消息 topic = {} , content = {}",record.topic(),record.value());
            String messageText = record.value();
            // 业务代码......
            log.info("消费到的信息==="+messageText);
            JSONObject result = null;
            try {
                result = JSON.parseObject(messageText);
            }catch (Exception e){
                log.error("kafka数据消费异常！！！");
            }

            Iterator<Map.Entry<String,HashMap>> it =  hashMap.entrySet().iterator();
            HashMap<String, UserTag> hashMap = new HashMap<>();
            UserMog userMog = new UserMog();
            List<String> ll = new ArrayList<>();
            while(it.hasNext()){
                Map.Entry<String,HashMap> entry=it.next();
                if(result.containsKey(entry.getKey())){
                    ll.add(result.get(entry.getKey()).toString());
                    if(RedisUtil.existAny(result.get(entry.getKey()).toString())){
                        userMog.setSuperId(RedisUtil.get(result.get(entry.getKey()).toString()).toString());
                    }
                    if(entry.getKey().equals("phone")){
                        userMog.setPhone(result.get(entry.getKey()).toString());
                    }
                    result.entrySet().forEach(i ->{
                        UserTag userTag = new UserTag();
                        userTag.setCreateTime(new Date().getTime());
                        userTag.setName(i.getKey());
                        userTag.setValue(i.getValue().toString());
                        hashMap.put(i.getKey(),userTag);
                    });
                    userMog.setIdList(ll);
                    userMog.setHashMap(hashMap);
                    userMog.setId(UUID.randomUUID().toString());
                    log.info("loadMongo==>{}",userMog);
                    userMogService.update(userMog);
                    break;
                }
            }

        }catch (Exception e){
            log.error("ERROR====>{}",e);
            log.error("ERROR====>{}",e.getMessage());
            log.error("TOPIC:{},DATA:{},load数据异常",record.topic(),record.value());
        }
    }
}
